/*
 * Copyright 2013-16 Board of Trustees of Stanford University
 * Copyright 2013-16 Ecole Polytechnique Federale Lausanne (EPFL)
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 */

#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

#include <ixev.h>
#include <mempool.h>

#define ROUND_UP(num, multiple) ((((num) + (multiple) - 1) / (multiple)) * (multiple))

struct pp_conn {
	struct ixev_ctx ctx;
	size_t bytes_left;
	size_t w_offset;
	size_t r_offset;
	size_t p_offset;
	char data[];
};

typedef struct _TCPheader {
	uint8_t type; // 1
	uint8_t src_ip;  // 1 +1
	uint16_t port; // 2 +2
	uint16_t num_msg; // 2 +4
	uint16_t len_msg; // 2 +6
	//uint32_t ack; // 4 +8
} TCPheader; // 12  [12]

typedef enum _header_type {
	S_REQUEST  = 0x60,
	S_RESPONSE = 0x10,
	CONNECT = 40
} header_type;

struct thread_ctx {
	int id;
	int print_size;
	uint64_t cnt;
	uint64_t prv_cnt;
	uint64_t cur_time;
	uint64_t prv_time;
	uint64_t bytes;
	uint64_t prv_bytes;
};
#define TICKS_PER_US 2400
#define US_PER_SECOND 1000*1000
#define GBPS_PER_BPS 1024*1024

static __thread  struct thread_ctx tctx;
__thread int tid;

static size_t msg_size;

static struct mempool_datastore pp_conn_datastore;
static __thread struct mempool pp_conn_pool;

static void print_stats(void)
{
	ssize_t ret;
	char buf;
	uint64_t total_time, total_cnt, rps, rate;

    tctx.cur_time = rdtsc();
    total_time = (tctx.cur_time - tctx.prv_time) / TICKS_PER_US;

        if (total_time > 1000000) {

        total_cnt = tctx.cnt - tctx.prv_cnt;
		rps = total_cnt * US_PER_SECOND / total_time;
		rate = (tctx.bytes - tctx.prv_bytes) * 8 / GBPS_PER_BPS / total_time;
		printf("[T]Thread: %d \trps: %ld\tMbps: %ld\n", tid, rps, rate);

        /*if (4000000 > total_time ) {
			 tctx.print_size =  tctx.print_size * 2;
		}
		else if (6000000 < total_time ) {
			 tctx.print_size=  tctx.print_size / 2;
        }*/

		tctx.prv_cnt = tctx.cnt;
		tctx.prv_time = tctx.cur_time;
		tctx.prv_bytes = tctx.bytes;
        }
        else if ( ((tctx.bytes - tctx.prv_bytes) / msg_size) > tctx.print_size) {
                tctx.cur_time = rdtsc();
                total_time = (tctx.cur_time - tctx.prv_time) / TICKS_PER_US;

                rps = (tctx.bytes - tctx.prv_bytes) / msg_size * US_PER_SECOND / total_time;
                rate = (tctx.bytes - tctx.prv_bytes) * 8 / GBPS_PER_BPS / total_time;
                printf("[T]Thread: %d \trps: %ld\tMbps: %ld\n", tid, rps, rate);

                /*if (2000000 > total_time ) {
                         tctx.print_size =  tctx.print_size * 2;
                }
                else if (4000000 < total_time ) {
                         tctx.print_size=  tctx.print_size / 2;
                }*/


                tctx.prv_time = tctx.cur_time;
                tctx.prv_bytes = tctx.bytes;
        }
}


/* two cases:
 * 1) REQUEST: total_len <= len
 * 2) unkown */
static int process_data(char * buf, size_t len)
{
	int data_left = len;
	char *curbuf = buf;
	TCPheader *s_header;
	int len_msg;


	if(buf[0] != S_REQUEST && buf[0] != CONNECT)
		return len;
	while (data_left > sizeof(TCPheader)) {
		s_header = (TCPheader *)curbuf;
		if (s_header -> type == S_REQUEST) {
			len_msg = ntohs(s_header->len_msg);

			if (data_left - sizeof(TCPheader) >= len_msg)  {
				s_header->type = S_RESPONSE;
				data_left -= sizeof(TCPheader) + len_msg;
				curbuf += sizeof(TCPheader) + len_msg;
				tctx.cnt++;
			}
			else
				break;
		}
		else if(s_header -> type == CONNECT) {
			if (data_left < sizeof(TCPheader) ) {
				break;
			}
			else {
				data_left -= sizeof(TCPheader) ;
				curbuf += sizeof(TCPheader) ;
			}
		}
		else {
                                data_left = 0;
		}
	}
	//printf("in process_len :%d\n",len - data_left);
	return len - data_left;
}

static void pp_main_handler(struct ixev_ctx *ctx, unsigned int reason);

static void pp_stream_handler(struct ixev_ctx *ctx, unsigned int reason)
{
	struct pp_conn *conn = container_of(ctx, struct pp_conn, ctx);
	size_t bytes_so_far = msg_size - conn->bytes_left;
	ssize_t ret;

	ret = ixev_send(ctx, &conn->data[bytes_so_far], conn->bytes_left);
	if (ret < 0) {
		if (ret != -EAGAIN)
			ixev_close(ctx);
		return;
	}

	conn->bytes_left -= ret;
	if (!conn->bytes_left) {
		conn->bytes_left = msg_size;
		ixev_set_handler(ctx, IXEVIN, &pp_main_handler);
	}
}

static void pp_main_handler(struct ixev_ctx *ctx, unsigned int reason)
{
	struct pp_conn *conn = container_of(ctx, struct pp_conn, ctx);
	ssize_t ret;
	//printf("enter into main_handler %d\n",reason);
	int flag = 0;
	while (1) {
		size_t bytes_left;
		if (reason & IXEVIN) {
			if (conn->r_offset == conn->w_offset) {
				conn->w_offset = 0;
				conn->p_offset = 0;
				conn->r_offset = 0;
			}
			else {
				memmove(&conn->data[0], &conn->data[conn->w_offset], conn->r_offset - conn->w_offset);
				conn->r_offset -= conn->w_offset;
				conn->p_offset -= conn->w_offset;
				conn->w_offset = 0;
			}

			bytes_left = msg_size - conn->r_offset;

			if(bytes_left != 0) {
				ret = ixev_recv(ctx, &conn->data[conn->r_offset],	bytes_left);
				//printf("read_len :%d\n",ret);
				if (ret <= 0) {
					if (ret != -EAGAIN)
						ixev_close(ctx);
					return;
				}
				conn->r_offset += ret;

				ret = process_data(&conn->data[conn->p_offset], conn->r_offset - conn->p_offset);
				
				conn->p_offset += ret;
				tctx.bytes += ret;
			}
		}

		bytes_left = conn->p_offset - conn->w_offset;
		//printf("bytes_left :%d\n",bytes_left);
		if(bytes_left != 0) {
			ret = ixev_send(ctx, &conn->data[conn->w_offset], bytes_left);
			if (ret == bytes_left) {
				if (reason & IXEVOUT) {
					ixev_set_handler(ctx, IXEVIN, &pp_main_handler);
				}
			}
			else {
					ixev_set_handler(ctx, IXEVIN | IXEVOUT, &pp_main_handler);
			}

			if (ret <= 0) {
				if (ret != -EAGAIN)
					ixev_close(ctx);
				return;
			}

			conn->w_offset += ret;

			print_stats();
		}
	}
}

static struct ixev_ctx *pp_accept(struct ip_tuple *id)
{
	/* NOTE: we accept everything right now, did we want a port? */
	struct pp_conn *conn = mempool_alloc(&pp_conn_pool);
	if (!conn)
		return NULL;

	/* init conn */
	conn->p_offset = 0;
	conn->w_offset = 0;
	conn->r_offset = 0;
	conn->bytes_left = msg_size;
	ixev_ctx_init(&conn->ctx);
	ixev_set_handler(&conn->ctx, IXEVIN, &pp_main_handler);

	return &conn->ctx;
}

static void pp_release(struct ixev_ctx *ctx)
{
	struct pp_conn *conn = container_of(ctx, struct pp_conn, ctx);

	mempool_free(&pp_conn_pool, conn);
}

static struct ixev_conn_ops pp_conn_ops = {
	.accept		= &pp_accept,
	.release	= &pp_release,
};

static void *pp_main(void *arg)
{
	int ret;

	ret = ixev_init_thread();
	if (ret) {
		fprintf(stderr, "unable to init IXEV\n");
		return NULL;
	};

	ret = mempool_create(&pp_conn_pool, &pp_conn_datastore);
	if (ret) {
		fprintf(stderr, "unable to create mempool\n");
		return NULL;
	}
	
	tctx.id = *(unsigned int *) arg;
	
	printf("thread %d\n",tctx.id);
	tid = *(unsigned int *) arg;
	tctx.print_size = 100*1000;

	while (1) {
		//usleep(1);
		ixev_wait();
	}

	return NULL;
}

int main(int argc, char *argv[])
{
	int i, nr_cpu;
	pthread_t tid;
	int ret;
	unsigned int pp_conn_pool_entries;

	if (argc < 2) {
		fprintf(stderr, "Usage: %s MSG_SIZE [MAX_CONNECTIONS]\n", argv[0]);
		return -1;
	}

	msg_size = atol(argv[1]);

	if (argc >= 3)
		pp_conn_pool_entries = atoi(argv[2]);
	else
		pp_conn_pool_entries = 16 * 4096;

	pp_conn_pool_entries = ROUND_UP(pp_conn_pool_entries, MEMPOOL_DEFAULT_CHUNKSIZE);

	ret = ixev_init(&pp_conn_ops);
	if (ret) {
		fprintf(stderr, "failed to initialize ixev\n");
		return ret;
	}

	ret = mempool_create_datastore(&pp_conn_datastore, pp_conn_pool_entries, sizeof(struct pp_conn) + msg_size, 0, MEMPOOL_DEFAULT_CHUNKSIZE, "pp_conn");
	if (ret) {
		fprintf(stderr, "unable to create mempool\n");
		return ret;
	}

	nr_cpu = sys_nrcpus();
	if (nr_cpu < 1) {
		fprintf(stderr, "got invalid cpu count %d\n", nr_cpu);
		exit(-1);
	}
	nr_cpu--; /* don't count the main thread */

	sys_spawnmode(true);

	int id[20];
	for (i = 0; i < nr_cpu; i++) {
		id[i]=i;
		if (pthread_create(&tid, NULL, pp_main, &id[i])) {
			printf("thread %d\n",i);
			fprintf(stderr, "failed to spawn thread %d\n", i);
			exit(-1);
		}
	}

	id[i]=i;
	pp_main(&id[i]);
	return 0;
}

